<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程  ，继续 Leader 处理客户端 append 的请求流程中最至关重要的一环：日志复制。 DLedger 多副本的日志转发由 DLedgerEntryPusher 实现，接下来将对其进行详细介绍。  温馨提示：由于本篇幅较长，为了更好的理解其实现，大家可以带着如下疑问来通读本篇文章：1、raft 协议中有一个非常重">
<meta property="og:type" content="article">
<meta property="og:title" content="源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)">
<meta property="og:url" content="https://www.codingw.net/posts/91680207.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程  ，继续 Leader 处理客户端 append 的请求流程中最至关重要的一环：日志复制。 DLedger 多副本的日志转发由 DLedgerEntryPusher 实现，接下来将对其进行详细介绍。  温馨提示：由于本篇幅较长，为了更好的理解其实现，大家可以带着如下疑问来通读本篇文章：1、raft 协议中有一个非常重">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/2019091421331888.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190914213606209.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190914215700841.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190914220450532.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190914220604630.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190914221601709.png">
<meta property="article:published_time" content="2020-10-18T12:12:05.000Z">
<meta property="article:modified_time" content="2021-04-26T12:08:28.592Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/2019091421331888.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/91680207.html"/>





  <title>源码分析 RocketMQ DLedger(多副本) 之日志复制(传播) | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/91680207.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-10-18T20:12:05+08:00">
                2020-10-18
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/rocketmq/" itemprop="url" rel="index">
                    <span itemprop="name">rocketmq</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/91680207.html" class="leancloud_visitors" data-flag-title="源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><p>本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程  ，继续 Leader 处理客户端 append 的请求流程中最至关重要的一环：日志复制。</p>
<p>DLedger 多副本的日志转发由 DLedgerEntryPusher 实现，接下来将对其进行详细介绍。</p>
<blockquote>
<p>温馨提示：由于本篇幅较长，为了更好的理解其实现，大家可以带着如下疑问来通读本篇文章：<br>1、raft 协议中有一个非常重要的概念：已提交日志序号，该如何实现。<br>2、客户端向 DLedger 集群发送一条日志，必须得到集群中大多数节点的认可才能被认为写入成功。<br>3、raft 协议中追加、提交两个动作如何实现。</p>
</blockquote>
<p>@<a href="%E6%9C%AC%E8%8A%82%E7%9B%AE%E5%BD%95">TOC</a></p>
<p>日志复制(日志转发)由 DLedgerEntryPusher 实现，具体类图如下：<br><img src="https://img-blog.csdnimg.cn/2019091421331888.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>主要由如下4个类构成：</p>
<ul>
<li>DLedgerEntryPusher<br>DLedger 日志转发与处理核心类，该内会启动如下3个对象，其分别对应一个线程。</li>
<li>EntryHandler<br>日志接收处理线程，当节点为从节点时激活。</li>
<li>QuorumAckChecker<br>日志追加ACK投票处理线程，当前节点为主节点时激活。</li>
<li>EntryDispatcher<br>日志转发线程，当前节点为主节点时追加。</li>
</ul>
<p>接下来我们将详细介绍上述4个类，从而揭晓日志复制的核心实现原理。</p>
<h2 id="1、DLedgerEntryPusher"><a href="#1、DLedgerEntryPusher" class="headerlink" title="1、DLedgerEntryPusher"></a>1、DLedgerEntryPusher</h2><h3 id="1-1-核心类图"><a href="#1-1-核心类图" class="headerlink" title="1.1 核心类图"></a>1.1 核心类图</h3><p><img src="https://img-blog.csdnimg.cn/20190914213606209.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>DLedger 多副本日志推送的核心实现类，里面会创建 EntryDispatcher、QuorumAckChecker、EntryHandler 三个核心线程。其核心属性如下：</p>
<ul>
<li>DLedgerConfig dLedgerConfig<br>多副本相关配置。</li>
<li>DLedgerStore dLedgerStore<br>存储实现类。</li>
<li>MemberState memberState<br>节点状态机。</li>
<li>DLedgerRpcService dLedgerRpcService<br>RPC 服务实现类，用于集群内的其他节点进行网络通讯。</li>
<li>Map&lt;Long, ConcurrentMap&lt;String, Long&gt;&gt; peerWaterMarksByTerm<br>每个节点基于投票轮次的当前水位线标记。键值为投票轮次，值为 ConcurrentMap&lt;String/** 节点id*/, Long/** 节点对应的日志序号*/&gt;。</li>
<li>Map&lt;Long, ConcurrentMap&lt;Long, TimeoutFuture<AppendEntryResponse>&gt;&gt; pendingAppendResponsesByTerm<br>用于存放追加请求的响应结果(Future模式)。</li>
<li>EntryHandler entryHandler<br>从节点上开启的线程，用于接收主节点的 push 请求（append、commit、append）。</li>
<li>QuorumAckChecker quorumAckChecker<br>主节点上的追加请求投票器。</li>
<li>Map&lt;String, EntryDispatcher&gt; dispatcherMap<br>主节点日志请求转发器，向从节点复制消息等。</li>
</ul>
<p>接下来介绍一下其核心方法的实现。</p>
<h3 id="1-2-构造方法"><a href="#1-2-构造方法" class="headerlink" title="1.2 构造方法"></a>1.2 构造方法</h3><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="title">DLedgerEntryPusher</span><span class="params">(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,</span></span></span><br><span class="line"><span class="function"><span class="params">    DLedgerRpcService dLedgerRpcService)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">this</span>.dLedgerConfig = dLedgerConfig;</span><br><span class="line">    <span class="keyword">this</span>.memberState = memberState;</span><br><span class="line">    <span class="keyword">this</span>.dLedgerStore = dLedgerStore;</span><br><span class="line">    <span class="keyword">this</span>.dLedgerRpcService = dLedgerRpcService;</span><br><span class="line">    <span class="keyword">for</span> (String peer : memberState.getPeerMap().keySet()) &#123;</span><br><span class="line">        <span class="keyword">if</span> (!peer.equals(memberState.getSelfId())) &#123;</span><br><span class="line">            dispatcherMap.put(peer, <span class="keyword">new</span> EntryDispatcher(peer, logger));</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>构造方法的重点是会根据集群内的节点，依次构建对应的 EntryDispatcher 对象。</p>
<h3 id="1-3-startup"><a href="#1-3-startup" class="headerlink" title="1.3 startup"></a>1.3 startup</h3><p>DLedgerEntryPusher#startup</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">startup</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    entryHandler.start();</span><br><span class="line">    quorumAckChecker.start();</span><br><span class="line">    <span class="keyword">for</span> (EntryDispatcher dispatcher : dispatcherMap.values()) &#123;</span><br><span class="line">        dispatcher.start();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>依次启动 EntryHandler、QuorumAckChecker 与 EntryDispatcher 线程。</p>
<blockquote>
<p>备注：DLedgerEntryPusher 的其他核心方法在详细分析其日志复制原理的过程中会一一介绍。</p>
</blockquote>
<p>接下来将从 EntryDispatcher、QuorumAckChecker、EntryHandler 来阐述 RocketMQ DLedger(多副本)的实现原理。</p>
<h2 id="2、EntryDispatcher-详解"><a href="#2、EntryDispatcher-详解" class="headerlink" title="2、EntryDispatcher 详解"></a>2、EntryDispatcher 详解</h2><h3 id="2-1-核心类图"><a href="#2-1-核心类图" class="headerlink" title="2.1 核心类图"></a>2.1 核心类图</h3><p><img src="https://img-blog.csdnimg.cn/20190914215700841.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>其核心属性如下。</p>
<ul>
<li>AtomicReference&lt;PushEntryRequest.Type&gt; type = new AtomicReference&lt;&gt;(PushEntryRequest.Type.COMPARE)<br>向从节点发送命令的类型，可选值：PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT，下面详细说明。</li>
<li>long lastPushCommitTimeMs = -1<br>上一次发送提交类型的时间戳。</li>
<li>String peerId<br>目标节点ID。</li>
<li>long compareIndex = -1<br>已完成比较的日志序号。</li>
<li>long writeIndex = -1<br>已写入的日志序号。</li>
<li>int maxPendingSize = 1000<br>允许的最大挂起日志数量。</li>
<li>long term = -1<pre><code> Leader 节点当前的投票轮次。
</code></pre>
</li>
<li>String leaderId = null<br>Leader 节点ID。</li>
<li>long lastCheckLeakTimeMs = System.currentTimeMillis()<br>上次检测泄漏的时间，所谓的泄漏，就是看挂起的日志请求数量是否查过了 maxPendingSize 。</li>
<li>ConcurrentMap&lt;Long, Long&gt; pendingMap = new ConcurrentHashMap&lt;&gt;()<br>记录日志的挂起时间，key：日志的序列(entryIndex)，value：挂起时间戳。</li>
<li>Quota quota = new Quota(dLedgerConfig.getPeerPushQuota())<br>配额。</li>
</ul>
<h3 id="2-2-Push-请求类型"><a href="#2-2-Push-请求类型" class="headerlink" title="2.2 Push 请求类型"></a>2.2 Push 请求类型</h3><p>DLedger 主节点向从从节点复制日志总共定义了4类请求类型，其枚举类型为 PushEntryRequest.Type，其值分别为 COMPARE、TRUNCATE、APPEND、COMMIT。</p>
<ul>
<li>COMPARE<br>如果 Leader 发生变化，新的 Leader 需要与他的从节点的日志条目进行比较，以便截断从节点多余的数据。 </li>
<li>TRUNCATE<br>如果 Leader 通过索引完成日志对比，则 Leader 将发送  TRUNCATE 给它的从节点。</li>
<li>APPEND<br>将日志条目追加到从节点。</li>
<li>COMMIT<br>通常，leader 会将提交的索引附加到 append 请求，但是如果 append 请求很少且分散，leader 将发送一个单独的请求来通知从节点提交的索引。</li>
</ul>
<p>对主从节点的请求类型有了一个初步的认识后，我们将从 EntryDispatcher 的业务处理入口 doWork 方法开始讲解。</p>
<span id="more"></span>

<h3 id="2-3-doWork-方法详解"><a href="#2-3-doWork-方法详解" class="headerlink" title="2.3 doWork 方法详解"></a>2.3 doWork 方法详解</h3><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">doWork</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (!checkAndFreshState()) &#123;                                            <span class="comment">// @1</span></span><br><span class="line">            waitForRunning(<span class="number">1</span>);</span><br><span class="line">            <span class="keyword">return</span>;</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (type.get() == PushEntryRequest.Type.APPEND) &#123;   <span class="comment">// @2</span></span><br><span class="line">            doAppend();</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            doCompare();                                                           <span class="comment">// @3</span></span><br><span class="line">        &#125;</span><br><span class="line">        waitForRunning(<span class="number">1</span>);</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        DLedgerEntryPusher.logger.error(<span class="string">&quot;[Push-&#123;&#125;]Error in &#123;&#125; writeIndex=&#123;&#125; compareIndex=&#123;&#125;&quot;</span>, peerId, getName(), writeIndex, compareIndex, t);</span><br><span class="line">        DLedgerUtils.sleep(<span class="number">500</span>);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：检查状态，是否可以继续发送 append 或 compare。</p>
<p>代码@2：如果推送类型为APPEND，主节点向从节点传播消息请求。</p>
<p>代码@3：主节点向从节点发送对比数据差异请求（当一个新节点被选举成为主节点时，往往这是第一步）。</p>
<h4 id="2-3-1-checkAndFreshState-详解"><a href="#2-3-1-checkAndFreshState-详解" class="headerlink" title="2.3.1 checkAndFreshState 详解"></a>2.3.1 checkAndFreshState 详解</h4><p>EntryDispatcher#checkAndFreshState</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">boolean</span> <span class="title">checkAndFreshState</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (!memberState.isLeader()) &#123;     <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (term != memberState.currTerm() || leaderId == <span class="keyword">null</span> || !leaderId.equals(memberState.getLeaderId())) &#123;     <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">synchronized</span> (memberState) &#123;</span><br><span class="line">            <span class="keyword">if</span> (!memberState.isLeader()) &#123;</span><br><span class="line">                <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);</span><br><span class="line">            term = memberState.currTerm();</span><br><span class="line">            leaderId = memberState.getSelfId();</span><br><span class="line">            changeState(-<span class="number">1</span>, PushEntryRequest.Type.COMPARE);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果节点的状态不是主节点，则直接返回 false。则结束 本次 doWork 方法。因为只有主节点才需要向从节点转发日志。</p>
<p>代码@2：如果当前节点状态是主节点，但当前的投票轮次与状态机轮次或 leaderId 还未设置，或 leaderId 与状态机的 leaderId 不相等，这种情况通常是集群触发了重新选举，设置其term、leaderId与状态机同步，即将发送COMPARE 请求。</p>
<p>接下来看一下 changeState (改变状态)。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">synchronized</span> <span class="keyword">void</span> <span class="title">changeState</span><span class="params">(<span class="keyword">long</span> index, PushEntryRequest.Type target)</span> </span>&#123;</span><br><span class="line">    logger.info(<span class="string">&quot;[Push-&#123;&#125;]Change state from &#123;&#125; to &#123;&#125; at &#123;&#125;&quot;</span>, peerId, type.get(), target, index);</span><br><span class="line">    <span class="keyword">switch</span> (target) &#123;</span><br><span class="line">        <span class="keyword">case</span> APPEND:      <span class="comment">// @1</span></span><br><span class="line">            compareIndex = -<span class="number">1</span>;</span><br><span class="line">            updatePeerWaterMark(term, peerId, index);</span><br><span class="line">            quorumAckChecker.wakeup();</span><br><span class="line">            writeIndex = index + <span class="number">1</span>;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> COMPARE:    <span class="comment">// @2</span></span><br><span class="line">            <span class="keyword">if</span> (<span class="keyword">this</span>.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) &#123;</span><br><span class="line">                compareIndex = -<span class="number">1</span>;</span><br><span class="line">                pendingMap.clear();</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> TRUNCATE:     <span class="comment">// @3</span></span><br><span class="line">            compareIndex = -<span class="number">1</span>;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">default</span>:</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    type.set(target);</span><br><span class="line">&#125; </span><br></pre></td></tr></table></figure>
<p>代码@1：如果将目标类型设置为 append，则重置 compareIndex ，并设置 writeIndex 为当前 index 加1。</p>
<p>代码@2：如果将目标类型设置为 COMPARE，则重置 compareIndex 为负一，接下将向各个从节点发送 COMPARE 请求类似，并清除已挂起的请求。</p>
<p>代码@3：如果将目标类型设置为 TRUNCATE，则重置 compareIndex 为负一。</p>
<p>接下来具体来看一下 APPEND、COMPARE、TRUNCATE 等请求。</p>
<h4 id="2-3-2-append-请求详解"><a href="#2-3-2-append-请求详解" class="headerlink" title="2.3.2 append 请求详解"></a>2.3.2 append 请求详解</h4><p>EntryDispatcher#doAppend</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">doAppend</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">while</span> (<span class="keyword">true</span>) &#123;</span><br><span class="line">        <span class="keyword">if</span> (!checkAndFreshState()) &#123;                                                 <span class="comment">// @1</span></span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (type.get() != PushEntryRequest.Type.APPEND) &#123;        <span class="comment">// @2</span></span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (writeIndex &gt; dLedgerStore.getLedgerEndIndex()) &#123;    <span class="comment">// @3</span></span><br><span class="line">            doCommit();</span><br><span class="line">            doCheckAppendResponse();</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (pendingMap.size() &gt;= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) &gt; <span class="number">1000</span>)) &#123;     <span class="comment">// @4</span></span><br><span class="line">            <span class="keyword">long</span> peerWaterMark = getPeerWaterMark(term, peerId);</span><br><span class="line">            <span class="keyword">for</span> (Long index : pendingMap.keySet()) &#123;</span><br><span class="line">                <span class="keyword">if</span> (index &lt; peerWaterMark) &#123;</span><br><span class="line">                    pendingMap.remove(index);</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">            lastCheckLeakTimeMs = System.currentTimeMillis();</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (pendingMap.size() &gt;= maxPendingSize) &#123;    <span class="comment">// @5</span></span><br><span class="line">            doCheckAppendResponse();</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        doAppendInner(writeIndex);                               <span class="comment">// @6</span></span><br><span class="line">        writeIndex++;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：检查状态，已经在上面详细介绍。</p>
<p>代码@2：如果请求类型不为 APPEND，则退出，结束本轮 doWork 方法执行。</p>
<p>代码@3：writeIndex 表示当前追加到从该节点的序号，通常情况下主节点向从节点发送 append 请求时，会附带主节点的已提交指针，但如何 append 请求发不那么频繁，writeIndex 大于 leaderEndIndex 时（由于pending请求超过其 pending 请求的队列长度（默认为1w)，时，会阻止数据的追加，此时有可能出现 writeIndex 大于 leaderEndIndex 的情况，此时单独发送 COMMIT 请求。</p>
<p>代码@4：检测 pendingMap(挂起的请求数量)是否发送泄漏，即挂起队列中容量是否超过允许的最大挂起阀值。获取当前节点关于本轮次的当前水位线(已成功 append 请求的日志序号)，如果发现正在挂起请求的日志序号小于水位线，则丢弃。</p>
<p>代码@5：如果挂起的请求（等待从节点追加结果）大于 maxPendingSize 时，检查并追加一次 append 请求。</p>
<p>代码@6：具体的追加请求。</p>
<h5 id="2-3-2-1-doCommit-发送提交请求"><a href="#2-3-2-1-doCommit-发送提交请求" class="headerlink" title="2.3.2.1 doCommit 发送提交请求"></a>2.3.2.1 doCommit 发送提交请求</h5><p>EntryDispatcher#doCommit</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">doCommit</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (DLedgerUtils.elapsed(lastPushCommitTimeMs) &gt; <span class="number">1000</span>) &#123;   <span class="comment">// @1</span></span><br><span class="line">        PushEntryRequest request = buildPushRequest(<span class="keyword">null</span>, PushEntryRequest.Type.COMMIT);   <span class="comment">// @2</span></span><br><span class="line">        <span class="comment">//Ignore the results</span></span><br><span class="line">        dLedgerRpcService.push(request);                                                                                        <span class="comment">// @3</span></span><br><span class="line">        lastPushCommitTimeMs = System.currentTimeMillis();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果上一次单独发送 commit 的请求时间与当前时间相隔低于 1s，放弃本次提交请求。</p>
<p>代码@2：构建提交请求。</p>
<p>代码@3：通过网络向从节点发送 commit 请求。</p>
<p>接下来先了解一下如何构建 commit 请求包。</p>
<p>EntryDispatcher#buildPushRequest</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> PushEntryRequest <span class="title">buildPushRequest</span><span class="params">(DLedgerEntry entry, PushEntryRequest.Type target)</span> </span>&#123;</span><br><span class="line">    PushEntryRequest request = <span class="keyword">new</span> PushEntryRequest();</span><br><span class="line">    request.setGroup(memberState.getGroup());  </span><br><span class="line">    request.setRemoteId(peerId);                          </span><br><span class="line">    request.setLeaderId(leaderId);</span><br><span class="line">    request.setTerm(term);</span><br><span class="line">    request.setEntry(entry);</span><br><span class="line">    request.setType(target);</span><br><span class="line">    request.setCommitIndex(dLedgerStore.getCommittedIndex());</span><br><span class="line">    <span class="keyword">return</span> request;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>提交包请求字段主要包含如下字段：DLedger 节点所属组、从节点 id、主节点 id，当前投票轮次、日志内容、请求类型与 committedIndex(主节点已提交日志序号)。</p>
<h5 id="2-3-2-2-doCheckAppendResponse-检查并追加请求"><a href="#2-3-2-2-doCheckAppendResponse-检查并追加请求" class="headerlink" title="2.3.2.2 doCheckAppendResponse 检查并追加请求"></a>2.3.2.2 doCheckAppendResponse 检查并追加请求</h5><p>EntryDispatcher#doCheckAppendResponse</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">doCheckAppendResponse</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">long</span> peerWaterMark = getPeerWaterMark(term, peerId);   <span class="comment">// @1</span></span><br><span class="line">    Long sendTimeMs = pendingMap.get(peerWaterMark + <span class="number">1</span>); </span><br><span class="line">    <span class="keyword">if</span> (sendTimeMs != <span class="keyword">null</span> &amp;&amp; System.currentTimeMillis() - sendTimeMs &gt; dLedgerConfig.getMaxPushTimeOutMs()) &#123; <span class="comment">// @2</span></span><br><span class="line">        logger.warn(<span class="string">&quot;[Push-&#123;&#125;]Retry to push entry at &#123;&#125;&quot;</span>, peerId, peerWaterMark + <span class="number">1</span>);</span><br><span class="line">        doAppendInner(peerWaterMark + <span class="number">1</span>);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法的作用是检查 append 请求是否超时，其关键实现如下：</p>
<ul>
<li>获取已成功 append 的序号。</li>
<li>从挂起的请求队列中获取下一条的发送时间，如果不为空并去超过了 append 的超时时间，则再重新发送 append 请求，最大超时时间默认为 1s，可以通过 maxPushTimeOutMs 来改变默认值。</li>
</ul>
<h5 id="2-3-2-3-doAppendInner-追加请求"><a href="#2-3-2-3-doAppendInner-追加请求" class="headerlink" title="2.3.2.3 doAppendInner 追加请求"></a>2.3.2.3 doAppendInner 追加请求</h5><p>向从节点发送 append 请求。</p>
<p>EntryDispatcher#doAppendInner</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">doAppendInner</span><span class="params">(<span class="keyword">long</span> index)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    DLedgerEntry entry = dLedgerStore.get(index);   <span class="comment">// @1</span></span><br><span class="line">    PreConditions.check(entry != <span class="keyword">null</span>, DLedgerResponseCode.UNKNOWN, <span class="string">&quot;writeIndex=%d&quot;</span>, index);</span><br><span class="line">    checkQuotaAndWait(entry);                                   <span class="comment">// @2</span></span><br><span class="line">    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   <span class="comment">// @3</span></span><br><span class="line">    CompletableFuture&lt;PushEntryResponse&gt; responseFuture = dLedgerRpcService.push(request);   <span class="comment">// @4</span></span><br><span class="line">    pendingMap.put(index, System.currentTimeMillis());                                                                          <span class="comment">// @5</span></span><br><span class="line">    responseFuture.whenComplete((x, ex) -&gt; &#123;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            PreConditions.check(ex == <span class="keyword">null</span>, DLedgerResponseCode.UNKNOWN);</span><br><span class="line">            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());</span><br><span class="line">            <span class="keyword">switch</span> (responseCode) &#123;</span><br><span class="line">                <span class="keyword">case</span> SUCCESS:                                                                                                                <span class="comment">// @6</span></span><br><span class="line">                    pendingMap.remove(x.getIndex());</span><br><span class="line">                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());</span><br><span class="line">                    quorumAckChecker.wakeup();</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> INCONSISTENT_STATE:                                                                                         <span class="comment">// @7</span></span><br><span class="line">                    logger.info(<span class="string">&quot;[Push-&#123;&#125;]Get INCONSISTENT_STATE when push index=&#123;&#125; term=&#123;&#125;&quot;</span>, peerId, x.getIndex(), x.getTerm());</span><br><span class="line">                    changeState(-<span class="number">1</span>, PushEntryRequest.Type.COMPARE);</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">default</span>:</span><br><span class="line">                    logger.warn(<span class="string">&quot;[Push-&#123;&#125;]Get error response code &#123;&#125; &#123;&#125;&quot;</span>, peerId, responseCode, x.baseInfo());</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">            logger.error(<span class="string">&quot;&quot;</span>, t);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;);</span><br><span class="line">    lastPushCommitTimeMs = System.currentTimeMillis();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先根据序号查询出日志。</p>
<p>代码@2：检测配额，如果超过配额，会进行一定的限流，其关键实现点：</p>
<ul>
<li>首先触发条件：append 挂起请求数已超过最大允许挂起数；基于文件存储并主从差异超过300m，可通过 peerPushThrottlePoint 配置。</li>
<li>每秒追加的日志超过 20m(可通过 peerPushQuota 配置)，则会 sleep 1s中后再追加。</li>
</ul>
<p>代码@3：构建 PUSH  请求日志。</p>
<p>代码@4：通过 Netty 发送网络请求到从节点，从节点收到请求会进行处理(本文并不会探讨与网络相关的实现细节)。</p>
<p>代码@5：用 pendingMap 记录待追加的日志的发送时间，用于发送端判断是否超时的一个依据。</p>
<p>代码@6：请求成功的处理逻辑，其关键实现点如下：</p>
<ul>
<li>移除 pendingMap 中的关于该日志的发送超时时间。</li>
<li>更新已成功追加的日志序号(按投票轮次组织，并且每个从服务器一个键值对)。</li>
<li>唤醒 quorumAckChecker 线程(主要用于仲裁 append 结果)，后续会详细介绍。</li>
</ul>
<p>代码@7：Push 请求出现状态不一致情况，将发送 COMPARE 请求，来对比主从节点的数据是否一致。</p>
<p>日志转发 append 追加请求类型就介绍到这里了，接下来我们继续探讨另一个请求类型 compare。</p>
<h4 id="2-3-3-compare-请求详解"><a href="#2-3-3-compare-请求详解" class="headerlink" title="2.3.3  compare 请求详解"></a>2.3.3  compare 请求详解</h4><p>COMPARE 类型的请求有 doCompare 方法发送，首先该方法运行在 while (true) 中，故在查阅下面代码时，要注意其退出循环的条件。<br>EntryDispatcher#doCompare</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (!checkAndFreshState()) &#123;</span><br><span class="line">    <span class="keyword">break</span>;</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">if</span> (type.get() != PushEntryRequest.Type.COMPARE</span><br><span class="line">    &amp;&amp; type.get() != PushEntryRequest.Type.TRUNCATE) &#123;</span><br><span class="line">    <span class="keyword">break</span>;</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">if</span> (compareIndex == -<span class="number">1</span> &amp;&amp; dLedgerStore.getLedgerEndIndex() == -<span class="number">1</span>) &#123;</span><br><span class="line">    <span class="keyword">break</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step1：验证是否执行，有几个关键点如下：</p>
<ul>
<li>判断是否是主节点，如果不是主节点，则直接跳出。</li>
<li>如果是请求类型不是 COMPARE 或 TRUNCATE 请求，则直接跳出。</li>
<li>如果已比较索引 和 ledgerEndIndex 都为 -1 ，表示一个新的 DLedger 集群，则直接跳出。</li>
</ul>
<p>EntryDispatcher#doCompare</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (compareIndex == -<span class="number">1</span>) &#123;</span><br><span class="line">    compareIndex = dLedgerStore.getLedgerEndIndex();</span><br><span class="line">    logger.info(<span class="string">&quot;[Push-&#123;&#125;][DoCompare] compareIndex=-1 means start to compare&quot;</span>, peerId);</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (compareIndex &gt; dLedgerStore.getLedgerEndIndex() || compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) &#123;</span><br><span class="line">    logger.info(<span class="string">&quot;[Push-&#123;&#125;][DoCompare] compareIndex=&#123;&#125; out of range &#123;&#125;-&#123;&#125;&quot;</span>, peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());</span><br><span class="line">    compareIndex = dLedgerStore.getLedgerEndIndex();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：如果 compareIndex 为 -1 或compareIndex 不在有效范围内，则重置待比较序列号为当前已已存储的最大日志序号：ledgerEndIndex。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line">DLedgerEntry entry = dLedgerStore.get(compareIndex);</span><br><span class="line">PreConditions.check(entry != <span class="keyword">null</span>, DLedgerResponseCode.INTERNAL_ERROR, <span class="string">&quot;compareIndex=%d&quot;</span>, compareIndex);</span><br><span class="line">PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);</span><br><span class="line">CompletableFuture&lt;PushEntryResponse&gt; responseFuture = dLedgerRpcService.push(request);</span><br><span class="line">PushEntryResponse response = responseFuture.get(<span class="number">3</span>, TimeUnit.SECONDS);</span><br></pre></td></tr></table></figure>
<p>Step3：根据序号查询到日志，并向从节点发起 COMPARE 请求，其超时时间为 3s。</p>
<p>EntryDispatcher#doCompare</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">long</span> truncateIndex = -<span class="number">1</span>;</span><br><span class="line"><span class="keyword">if</span> (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) &#123;   <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">if</span> (compareIndex == response.getEndIndex()) &#123;</span><br><span class="line">        changeState(compareIndex, PushEntryRequest.Type.APPEND);</span><br><span class="line">        <span class="keyword">break</span>;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        truncateIndex = compareIndex;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (response.getEndIndex() &lt; dLedgerStore.getLedgerBeginIndex() </span><br><span class="line">        || response.getBeginIndex() &gt; dLedgerStore.getLedgerEndIndex()) &#123;    <span class="comment">// @2</span></span><br><span class="line">    truncateIndex = dLedgerStore.getLedgerBeginIndex();</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (compareIndex &lt; response.getBeginIndex()) &#123;                                    <span class="comment">// @3</span></span><br><span class="line">    truncateIndex = dLedgerStore.getLedgerBeginIndex();</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (compareIndex &gt; response.getEndIndex()) &#123;                                      <span class="comment">// @4</span></span><br><span class="line">    compareIndex = response.getEndIndex();</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;                                                                                                              <span class="comment">// @5</span></span><br><span class="line">	compareIndex--;</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="keyword">if</span> (compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) &#123;                          <span class="comment">// @6</span></span><br><span class="line">    truncateIndex = dLedgerStore.getLedgerBeginIndex();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step4：根据响应结果计算需要截断的日志序号，其主要实现关键点如下：</p>
<ul>
<li>代码@1：如果两者的日志序号相同，则无需截断，下次将直接先从节点发送 append 请求；否则将 truncateIndex  设置为响应结果中的 endIndex。</li>
<li>代码@2：如果从节点存储的最大日志序号小于主节点的最小序号，或者从节点的最小日志序号大于主节点的最大日志序号，即两者不相交，这通常发生在从节点崩溃很长一段时间，而主节点删除了过期的条目时。truncateIndex 设置为主节点的 ledgerBeginIndex，即主节点目前最小的偏移量。</li>
<li>代码@3：如果已比较的日志序号小于从节点的开始日志序号，很可能是从节点磁盘发送损耗，从主节点最小日志序号开始同步。</li>
<li>代码@4：如果已比较的日志序号大于从节点的最大日志序号，则已比较索引设置为从节点最大的日志序号，触发数据的继续同步。</li>
<li>代码@5：如果已比较的日志序号大于从节点的开始日志序号，但小于从节点的最大日志序号，则待比较索引减一。</li>
<li>代码@6：如果比较出来的日志序号小于主节点的最小日志需要，则设置为主节点的最小序号。</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (truncateIndex != -<span class="number">1</span>) &#123;</span><br><span class="line">    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);</span><br><span class="line">    doTruncate(truncateIndex);</span><br><span class="line">    <span class="keyword">break</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step5：如果比较出来的日志序号不等于 -1 ，则向从节点发送 TRUNCATE 请求。</p>
<h5 id="2-3-3-1-doTruncate-详解"><a href="#2-3-3-1-doTruncate-详解" class="headerlink" title="2.3.3.1 doTruncate 详解"></a>2.3.3.1 doTruncate 详解</h5><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">doTruncate</span><span class="params">(<span class="keyword">long</span> truncateIndex)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);</span><br><span class="line">    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);</span><br><span class="line">    PreConditions.check(truncateEntry != <span class="keyword">null</span>, DLedgerResponseCode.UNKNOWN);</span><br><span class="line">    logger.info(<span class="string">&quot;[Push-&#123;&#125;]Will push data to truncate truncateIndex=&#123;&#125; pos=&#123;&#125;&quot;</span>, peerId, truncateIndex, truncateEntry.getPos());</span><br><span class="line">    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);</span><br><span class="line">    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(<span class="number">3</span>, TimeUnit.SECONDS);</span><br><span class="line">    PreConditions.check(truncateResponse != <span class="keyword">null</span>, DLedgerResponseCode.UNKNOWN, <span class="string">&quot;truncateIndex=%d&quot;</span>, truncateIndex);</span><br><span class="line">    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), <span class="string">&quot;truncateIndex=%d&quot;</span>, truncateIndex);</span><br><span class="line">    lastPushCommitTimeMs = System.currentTimeMillis();</span><br><span class="line">    changeState(truncateIndex, PushEntryRequest.Type.APPEND);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法主要就是构建 truncate 请求到从节点。</p>
<p>关于服务端的消息复制转发就介绍到这里了，主节点负责向从服务器PUSH请求，从节点自然而然的要处理这些请求，接下来我们就按照主节点发送的请求，来具体分析一下从节点是如何响应的。</p>
<h2 id="3、EntryHandler-详解"><a href="#3、EntryHandler-详解" class="headerlink" title="3、EntryHandler 详解"></a>3、EntryHandler 详解</h2><p>EntryHandler 同样是一个线程，当节点状态为从节点时激活。</p>
<h3 id="3-1-核心类图"><a href="#3-1-核心类图" class="headerlink" title="3.1 核心类图"></a>3.1 核心类图</h3><p><img src="https://img-blog.csdnimg.cn/20190914220450532.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>其核心属性如下：</p>
<ul>
<li>long lastCheckFastForwardTimeMs<br>上一次检查主服务器是否有 push 消息的时间戳。</li>
<li>ConcurrentMap&lt;Long, Pair&lt;PushEntryRequest, CompletableFuture&lt; PushEntryResponse&gt;&gt;&gt; writeRequestMap<br>append 请求处理队列。</li>
<li>BlockingQueue&lt;Pair&lt;PushEntryRequest, CompletableFuture&lt; PushEntryResponse&gt;&gt;&gt; compareOrTruncateRequests<br>COMMIT、COMPARE、TRUNCATE 相关请求</li>
</ul>
<h3 id="3-2-handlePush"><a href="#3-2-handlePush" class="headerlink" title="3.2 handlePush"></a>3.2 handlePush</h3><p>从上文得知，主节点会主动向从节点传播日志，从节点会通过网络接受到请求数据进行处理，其调用链如图所示：<br><img src="https://img-blog.csdnimg.cn/20190914220604630.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>最终会调用 EntryHandler 的 handlePush 方法。</p>
<p>EntryHandler#handlePush</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> CompletableFuture&lt;PushEntryResponse&gt; <span class="title">handlePush</span><span class="params">(PushEntryRequest request)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="comment">//The timeout should smaller than the remoting layer&#x27;s request timeout</span></span><br><span class="line">    CompletableFuture&lt;PushEntryResponse&gt; future = <span class="keyword">new</span> TimeoutFuture&lt;&gt;(<span class="number">1000</span>);      <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">switch</span> (request.getType()) &#123;</span><br><span class="line">        <span class="keyword">case</span> APPEND:                                                                                                          <span class="comment">// @2</span></span><br><span class="line">            PreConditions.check(request.getEntry() != <span class="keyword">null</span>, DLedgerResponseCode.UNEXPECTED_ARGUMENT);</span><br><span class="line">            <span class="keyword">long</span> index = request.getEntry().getIndex();</span><br><span class="line">            Pair&lt;PushEntryRequest, CompletableFuture&lt;PushEntryResponse&gt;&gt; old = writeRequestMap.putIfAbsent(index, <span class="keyword">new</span> Pair&lt;&gt;(request, future));</span><br><span class="line">            <span class="keyword">if</span> (old != <span class="keyword">null</span>) &#123;</span><br><span class="line">                logger.warn(<span class="string">&quot;[MONITOR]The index &#123;&#125; has already existed with &#123;&#125; and curr is &#123;&#125;&quot;</span>, index, old.getKey().baseInfo(), request.baseInfo());</span><br><span class="line">                future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> COMMIT:                                                                                                           <span class="comment">// @3</span></span><br><span class="line">            compareOrTruncateRequests.put(<span class="keyword">new</span> Pair&lt;&gt;(request, future));</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> COMPARE:</span><br><span class="line">        <span class="keyword">case</span> TRUNCATE:                                                                                                     <span class="comment">// @4</span></span><br><span class="line">            PreConditions.check(request.getEntry() != <span class="keyword">null</span>, DLedgerResponseCode.UNEXPECTED_ARGUMENT);</span><br><span class="line">            writeRequestMap.clear();</span><br><span class="line">            compareOrTruncateRequests.put(<span class="keyword">new</span> Pair&lt;&gt;(request, future));</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">default</span>:</span><br><span class="line">            logger.error(<span class="string">&quot;[BUG]Unknown type &#123;&#125; from &#123;&#125;&quot;</span>, request.getType(), request.baseInfo());</span><br><span class="line">            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> future;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>从几点处理主节点的 push 请求，其实现关键点如下。</p>
<p>代码@1：首先构建一个响应结果Future，默认超时时间 1s。</p>
<p>代码@2：如果是 APPEND 请求，放入到 writeRequestMap 集合中，如果已存在该数据结构，说明主节点重复推送，构建返回结果，其状态码为 REPEATED_PUSH。放入到 writeRequestMap 中，由 doWork 方法定时去处理待写入的请求。</p>
<p>代码@3：如果是提交请求， 将请求存入 compareOrTruncateRequests 请求处理中，由 doWork 方法异步处理。</p>
<p>代码@4：如果是 COMPARE 或 TRUNCATE 请求，将待写入队列 writeRequestMap  清空，并将请求放入 compareOrTruncateRequests 请求队列中，由 doWork 方法异步处理。</p>
<p>接下来，我们重点来分析 doWork 方法的实现。</p>
<h3 id="3-3-doWork-方法详解"><a href="#3-3-doWork-方法详解" class="headerlink" title="3.3 doWork 方法详解"></a>3.3 doWork 方法详解</h3><p>EntryHandler#doWork</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">doWork</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (!memberState.isFollower()) &#123;     <span class="comment">// @1</span></span><br><span class="line">            waitForRunning(<span class="number">1</span>);</span><br><span class="line">            <span class="keyword">return</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (compareOrTruncateRequests.peek() != <span class="keyword">null</span>) &#123;    <span class="comment">// @2</span></span><br><span class="line">            Pair&lt;PushEntryRequest, CompletableFuture&lt;PushEntryResponse&gt;&gt; pair = compareOrTruncateRequests.poll();</span><br><span class="line">            PreConditions.check(pair != <span class="keyword">null</span>, DLedgerResponseCode.UNKNOWN);</span><br><span class="line">            <span class="keyword">switch</span> (pair.getKey().getType()) &#123;</span><br><span class="line">                <span class="keyword">case</span> TRUNCATE:</span><br><span class="line">                    handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> COMPARE:</span><br><span class="line">                    handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> COMMIT:</span><br><span class="line">                    handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">default</span>:</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123; <span class="comment">// @3</span></span><br><span class="line">            <span class="keyword">long</span> nextIndex = dLedgerStore.getLedgerEndIndex() + <span class="number">1</span>;</span><br><span class="line">            Pair&lt;PushEntryRequest, CompletableFuture&lt;PushEntryResponse&gt;&gt; pair = writeRequestMap.remove(nextIndex);</span><br><span class="line">            <span class="keyword">if</span> (pair == <span class="keyword">null</span>) &#123;</span><br><span class="line">                checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());</span><br><span class="line">                waitForRunning(<span class="number">1</span>);</span><br><span class="line">                <span class="keyword">return</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            PushEntryRequest request = pair.getKey();</span><br><span class="line">            handleDoAppend(nextIndex, request, pair.getValue());</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        DLedgerEntryPusher.logger.error(<span class="string">&quot;Error in &#123;&#125;&quot;</span>, getName(), t);</span><br><span class="line">        DLedgerUtils.sleep(<span class="number">100</span>);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果当前节点的状态不是从节点，则跳出。</p>
<p>代码@2：如果 compareOrTruncateRequests 队列不为空，说明有COMMIT、COMPARE、TRUNCATE 等请求，这类请求优先处理。值得注意的是这里使用是 peek、poll 等非阻塞方法，然后根据请求的类型，调用对应的方法。稍后详细介绍。</p>
<p>代码@3：如果只有 append 类请求，则根据当前节点最大的消息序号，尝试从 writeRequestMap 容器中，获取下一个消息复制请求(ledgerEndIndex + 1) 为 key 去查找。如果不为空，则执行 doAppend 请求，如果为空，则调用 checkAbnormalFuture 来处理异常情况。</p>
<p>接下来我们来重点分析各个处理细节。</p>
<h4 id="3-3-1-handleDoCommit"><a href="#3-3-1-handleDoCommit" class="headerlink" title="3.3.1 handleDoCommit"></a>3.3.1 handleDoCommit</h4><p>处理提交请求，其处理比较简单，就是调用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量，故我们还是具体看一下DLedgerStore 的 updateCommittedIndex 方法。</p>
<p>DLedgerMmapFileStore#updateCommittedIndex</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">updateCommittedIndex</span><span class="params">(<span class="keyword">long</span> term, <span class="keyword">long</span> newCommittedIndex)</span> </span>&#123;   <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">if</span> (newCommittedIndex == -<span class="number">1</span></span><br><span class="line">            || ledgerEndIndex == -<span class="number">1</span></span><br><span class="line">            || term &lt; memberState.currTerm()</span><br><span class="line">            || newCommittedIndex == <span class="keyword">this</span>.committedIndex) &#123;                               <span class="comment">// @2</span></span><br><span class="line">            <span class="keyword">return</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (newCommittedIndex &lt; <span class="keyword">this</span>.committedIndex</span><br><span class="line">            || newCommittedIndex &lt; <span class="keyword">this</span>.ledgerBeginIndex) &#123;                             <span class="comment">// @3</span></span><br><span class="line">        logger.warn(<span class="string">&quot;[MONITOR]Skip update committed index for new=&#123;&#125; &lt; old=&#123;&#125; or new=&#123;&#125; &lt; beginIndex=&#123;&#125;&quot;</span>, newCommittedIndex, <span class="keyword">this</span>.committedIndex, newCommittedIndex, <span class="keyword">this</span>.ledgerBeginIndex);</span><br><span class="line">        <span class="keyword">return</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">long</span> endIndex = ledgerEndIndex;</span><br><span class="line">    <span class="keyword">if</span> (newCommittedIndex &gt; endIndex) &#123;                                                       <span class="comment">// @4</span></span><br><span class="line">            <span class="comment">//If the node fall behind too much, the committedIndex will be larger than enIndex.</span></span><br><span class="line">        newCommittedIndex = endIndex;</span><br><span class="line">    &#125;</span><br><span class="line">    DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        <span class="comment">// @5                </span></span><br><span class="line">    PreConditions.check(dLedgerEntry != <span class="keyword">null</span>, DLedgerResponseCode.DISK_ERROR);</span><br><span class="line">    <span class="keyword">this</span>.committedIndex = newCommittedIndex;</span><br><span class="line">    <span class="keyword">this</span>.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     <span class="comment">// @6</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先介绍一下方法的参数：</p>
<ul>
<li>long term<br>主节点当前的投票轮次。</li>
<li>long newCommittedIndex:<br>主节点发送日志复制请求时的已提交日志序号。</li>
</ul>
<p>代码@2：如果待更新提交序号为 -1 或 投票轮次小于从节点的投票轮次或主节点投票轮次等于从节点的已提交序号，则直接忽略本次提交动作。</p>
<p>代码@3：如果主节点的已提交日志序号小于从节点的已提交日志序号或待提交序号小于当前节点的最小有效日志序号，则输出警告日志[MONITOR]，并忽略本次提交动作。</p>
<p>代码@4：如果从节点落后主节点太多，则重置 提交索引为从节点当前最大有效日志序号。</p>
<p>代码@5：尝试根据待提交序号从从节点查找数据，如果数据不存在，则抛出 DISK_ERROR 错误。</p>
<p>代码@6：更新 commitedIndex、committedPos 两个指针，DledgerStore会定时将已提交指针刷入 checkpoint 文件，达到持久化 commitedIndex 指针的目的。</p>
<h4 id="3-3-2-handleDoCompare"><a href="#3-3-2-handleDoCompare" class="headerlink" title="3.3.2 handleDoCompare"></a>3.3.2 handleDoCompare</h4><p>处理主节点发送过来的 COMPARE 请求，其实现也比较简单，最终调用 buildResponse 方法构造响应结果。</p>
<p>EntryHandler#buildResponse</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> PushEntryResponse <span class="title">buildResponse</span><span class="params">(PushEntryRequest request, <span class="keyword">int</span> code)</span> </span>&#123;</span><br><span class="line">    PushEntryResponse response = <span class="keyword">new</span> PushEntryResponse();</span><br><span class="line">    response.setGroup(request.getGroup());</span><br><span class="line">    response.setCode(code);</span><br><span class="line">    response.setTerm(request.getTerm());</span><br><span class="line">    <span class="keyword">if</span> (request.getType() != PushEntryRequest.Type.COMMIT) &#123;</span><br><span class="line">        response.setIndex(request.getEntry().getIndex());</span><br><span class="line">    &#125;</span><br><span class="line">    response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());</span><br><span class="line">    response.setEndIndex(dLedgerStore.getLedgerEndIndex());</span><br><span class="line">    <span class="keyword">return</span> response;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>主要也是返回当前从几点的 ledgerBeginIndex、ledgerEndIndex 以及投票轮次，供主节点进行判断比较。</p>
<h4 id="3-3-3-handleDoTruncate"><a href="#3-3-3-handleDoTruncate" class="headerlink" title="3.3.3 handleDoTruncate"></a>3.3.3 handleDoTruncate</h4><p>handleDoTruncate 方法实现比较简单，删除从节点上 truncateIndex 日志序号之后的所有日志，具体调用dLedgerStore 的 truncate 方法，由于其存储与 RocketMQ 的存储设计基本类似故本文就不在详细介绍，简单介绍其实现要点：根据日志序号，去定位到日志文件，如果命中具体的文件，则修改相应的读写指针、刷盘指针等，并将所在在物理文件之后的所有文件删除。大家如有兴趣，可以查阅笔者的《RocketMQ技术内幕》第4章：RocketMQ 存储相关内容。</p>
<h4 id="3-3-4-handleDoAppend"><a href="#3-3-4-handleDoAppend" class="headerlink" title="3.3.4 handleDoAppend"></a>3.3.4 handleDoAppend</h4><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">handleDoAppend</span><span class="params">(<span class="keyword">long</span> writeIndex, PushEntryRequest request,</span></span></span><br><span class="line"><span class="function"><span class="params">    CompletableFuture&lt;PushEntryResponse&gt; future)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);</span><br><span class="line">        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());</span><br><span class="line">        PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);</span><br><span class="line">        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));</span><br><span class="line">        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        logger.error(<span class="string">&quot;[HandleDoWrite] writeIndex=&#123;&#125;&quot;</span>, writeIndex, t);</span><br><span class="line">        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>其实现也比较简单，调用DLedgerStore 的 appendAsFollower 方法进行日志的追加，与appendAsLeader 在日志存储部分相同，只是从节点无需再转发日志。</p>
<h4 id="3-3-5-checkAbnormalFuture"><a href="#3-3-5-checkAbnormalFuture" class="headerlink" title="3.3.5 checkAbnormalFuture"></a>3.3.5 checkAbnormalFuture</h4><p>该方法是本节的重点，doWork 的从服务器存储的最大有效日志序号(ledgerEndIndex) + 1 序号，尝试从待写请求中获取不到对应的请求时调用，这种情况也很常见，例如主节点并么有将最新的数据 PUSH 给从节点。接下来我们详细来看看该方法的实现细节。<br>EntryHandler#checkAbnormalFuture</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) &lt; <span class="number">1000</span>) &#123;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br><span class="line">lastCheckFastForwardTimeMs  = System.currentTimeMillis();</span><br><span class="line"><span class="keyword">if</span> (writeRequestMap.isEmpty()) &#123;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step1：如果上一次检查的时间距现在不到1s，则跳出；如果当前没有积压的append请求，同样跳出，因为可以同样明确的判断出主节点还未推送日志。</p>
<p>EntryHandler#checkAbnormalFuture</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">for</span> (Pair&lt;PushEntryRequest, CompletableFuture&lt;PushEntryResponse&gt;&gt; pair : writeRequestMap.values()) &#123;</span><br><span class="line">    <span class="keyword">long</span> index = pair.getKey().getEntry().getIndex();             <span class="comment">// @1</span></span><br><span class="line">    <span class="comment">//Fall behind</span></span><br><span class="line">    <span class="keyword">if</span> (index &lt;= endIndex) &#123;                                                   <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            DLedgerEntry local = dLedgerStore.get(index);</span><br><span class="line">            PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);</span><br><span class="line">            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));</span><br><span class="line">            logger.warn(<span class="string">&quot;[PushFallBehind]The leader pushed an entry index=&#123;&#125; smaller than current ledgerEndIndex=&#123;&#125;, maybe the last ack is missed&quot;</span>, index, endIndex);</span><br><span class="line">        &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">            logger.error(<span class="string">&quot;[PushFallBehind]The leader pushed an entry index=&#123;&#125; smaller than current ledgerEndIndex=&#123;&#125;, maybe the last ack is missed&quot;</span>, index, endIndex, t);</span><br><span class="line">            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));</span><br><span class="line">        &#125;</span><br><span class="line">        writeRequestMap.remove(index);</span><br><span class="line">        <span class="keyword">continue</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//Just OK</span></span><br><span class="line">    <span class="keyword">if</span> (index ==  endIndex + <span class="number">1</span>) &#123;    <span class="comment">// @3</span></span><br><span class="line">        <span class="comment">//The next entry is coming, just return</span></span><br><span class="line">        <span class="keyword">return</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//Fast forward</span></span><br><span class="line">    TimeoutFuture&lt;PushEntryResponse&gt; future  = (TimeoutFuture&lt;PushEntryResponse&gt;) pair.getValue();    <span class="comment">// @4</span></span><br><span class="line">    <span class="keyword">if</span> (!future.isTimeOut()) &#123;</span><br><span class="line">        <span class="keyword">continue</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (index &lt; minFastForwardIndex) &#123;                                                                                                                <span class="comment">// @5</span></span><br><span class="line">        minFastForwardIndex = index;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：遍历当前待写入的日志追加请求(主服务器推送过来的日志复制请求)，找到需要快速快进的的索引。其关键实现点如下：</p>
<ul>
<li>代码@1：首先获取待写入日志的序号。</li>
<li>代码@2：如果待写入的日志序号小于从节点已追加的日志(endIndex)，并且日志的确已存储在从节点，则返回成功，并输出警告日志【PushFallBehind】，继续监测下一条待写入日志。</li>
<li>代码@3：如果待写入 index 等于 endIndex + 1，则结束循环，因为下一条日志消息已经在待写入队列中，即将写入。</li>
<li>代码@4：如果待写入 index 大于 endIndex + 1，并且未超时，则直接检查下一条待写入日志。</li>
<li>代码@5：如果待写入 index 大于 endIndex + 1，并且已经超时，则记录该索引，使用 minFastForwardIndex 存储。</li>
</ul>
<p>EntryHandler#checkAbnormalFuture</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (minFastForwardIndex == Long.MAX_VALUE) &#123;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br><span class="line">Pair&lt;PushEntryRequest, CompletableFuture&lt;PushEntryResponse&gt;&gt; pair = writeRequestMap.get(minFastForwardIndex);</span><br><span class="line"><span class="keyword">if</span> (pair == <span class="keyword">null</span>) &#123;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step3：如果未找到需要快速失败的日志序号或 writeRequestMap 中未找到其请求，则直接结束检测。</p>
<p>EntryHandler#checkAbnormalFuture</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line">logger.warn(<span class="string">&quot;[PushFastForward] ledgerEndIndex=&#123;&#125; entryIndex=&#123;&#125;&quot;</span>, endIndex, minFastForwardIndex);</span><br><span class="line">pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));</span><br></pre></td></tr></table></figure>
<p>Step4：则向主节点报告从节点已经与主节点发生了数据不一致，从节点并没有写入序号 minFastForwardIndex 的日志。如果主节点收到此种响应，将会停止日志转发，转而向各个从节点发送 COMPARE 请求，从而使数据恢复一致。</p>
<p>行为至此，已经详细介绍了主服务器向从服务器发送请求，从服务做出响应，那接下来就来看一下，服务端收到响应结果后的处理，我们要知道主节点会向它所有的从节点传播日志，主节点需要在指定时间内收到超过集群一半节点的确认，才能认为日志写入成功，那我们接下来看一下其实现过程。</p>
<h2 id="4、QuorumAckChecker"><a href="#4、QuorumAckChecker" class="headerlink" title="4、QuorumAckChecker"></a>4、QuorumAckChecker</h2><p>日志复制投票器，一个日志写请求只有得到集群内的的大多数节点的响应，日志才会被提交。</p>
<h3 id="4-1-类图"><a href="#4-1-类图" class="headerlink" title="4.1 类图"></a>4.1 类图</h3><p><img src="https://img-blog.csdnimg.cn/20190914221601709.png" alt="在这里插入图片描述"><br>其核心属性如下：</p>
<ul>
<li>long lastPrintWatermarkTimeMs<br>上次打印水位线的时间戳，单位为毫秒。</li>
<li>long lastCheckLeakTimeMs<br>上次检测泄漏的时间戳，单位为毫秒。</li>
<li>long lastQuorumIndex<br>已投票仲裁的日志序号。</li>
</ul>
<h3 id="4-2-doWork-详解"><a href="#4-2-doWork-详解" class="headerlink" title="4.2 doWork 详解"></a>4.2 doWork 详解</h3><p>QuorumAckChecker#doWork </p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) &gt; <span class="number">3000</span>) &#123;    </span><br><span class="line">    logger.info(<span class="string">&quot;[&#123;&#125;][&#123;&#125;] term=&#123;&#125; ledgerBegin=&#123;&#125; ledgerEnd=&#123;&#125; committed=&#123;&#125; watermarks=&#123;&#125;&quot;</span>,</span><br><span class="line">            memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));</span><br><span class="line">    lastPrintWatermarkTimeMs = System.currentTimeMillis();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step1：如果离上一次打印 watermak 的时间超过3s，则打印一下当前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 这些数据日志。</p>
<p>QuorumAckChecker#doWork</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (!memberState.isLeader()) &#123;   <span class="comment">// @2</span></span><br><span class="line">    waitForRunning(<span class="number">1</span>);</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：如果当前节点不是主节点，直接返回，不作为。</p>
<p>QuorumAckChecker#doWork</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (pendingAppendResponsesByTerm.size() &gt; <span class="number">1</span>) &#123;   <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">for</span> (Long term : pendingAppendResponsesByTerm.keySet()) &#123;</span><br><span class="line">        <span class="keyword">if</span> (term == currTerm) &#123;</span><br><span class="line">            <span class="keyword">continue</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">for</span> (Map.Entry&lt;Long, TimeoutFuture&lt;AppendEntryResponse&gt;&gt; futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) &#123;</span><br><span class="line">            AppendEntryResponse response = <span class="keyword">new</span> AppendEntryResponse();</span><br><span class="line">            response.setGroup(memberState.getGroup());</span><br><span class="line">            response.setIndex(futureEntry.getKey());</span><br><span class="line">            response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());</span><br><span class="line">            response.setLeaderId(memberState.getLeaderId());</span><br><span class="line">            logger.info(<span class="string">&quot;[TermChange] Will clear the pending response index=&#123;&#125; for term changed from &#123;&#125; to &#123;&#125;&quot;</span>, futureEntry.getKey(), term, currTerm);</span><br><span class="line">            futureEntry.getValue().complete(response);</span><br><span class="line">        &#125;</span><br><span class="line">        pendingAppendResponsesByTerm.remove(term);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">if</span> (peerWaterMarksByTerm.size() &gt; <span class="number">1</span>) &#123;</span><br><span class="line">    <span class="keyword">for</span> (Long term : peerWaterMarksByTerm.keySet()) &#123;</span><br><span class="line">        <span class="keyword">if</span> (term == currTerm) &#123;</span><br><span class="line">            <span class="keyword">continue</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        logger.info(<span class="string">&quot;[TermChange] Will clear the watermarks for term changed from &#123;&#125; to &#123;&#125;&quot;</span>, term, currTerm);</span><br><span class="line">        peerWaterMarksByTerm.remove(term);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step3：清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票轮次的数据，避免一些不必要的内存使用。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br></pre></td><td class="code"><pre><span class="line">Map&lt;String, Long&gt; peerWaterMarks = peerWaterMarksByTerm.get(currTerm);</span><br><span class="line"><span class="keyword">long</span> quorumIndex = -<span class="number">1</span>;</span><br><span class="line"><span class="keyword">for</span> (Long index : peerWaterMarks.values()) &#123;  <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">int</span> num = <span class="number">0</span>;</span><br><span class="line">    <span class="keyword">for</span> (Long another : peerWaterMarks.values()) &#123;  <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">if</span> (another &gt;= index) &#123;</span><br><span class="line">            num++;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (memberState.isQuorum(num) &amp;&amp; index &gt; quorumIndex) &#123;  <span class="comment">// @3</span></span><br><span class="line">        quorumIndex = index;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br><span class="line">dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  <span class="comment">// @4</span></span><br></pre></td></tr></table></figure>
<p>Step4：根据各个从节点反馈的进度，进行仲裁，确定已提交序号。为了加深对这段代码的理解，再来啰嗦一下 peerWaterMarks 的作用，存储的是各个从节点当前已成功追加的日志序号。例如一个三节点的 DLedger 集群，peerWaterMarks 数据存储大概如下：</p>
<figure class="highlight"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line">&#123;</span><br><span class="line">“dledger_group_01_0” : 100,</span><br><span class="line">&quot;dledger_group_01_1&quot; : 101,</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>其中 dledger_group_01_0 为从节点1的ID，当前已复制的序号为 100，而 dledger_group_01_1 为节点2的ID，当前已复制的序号为 101。再加上主节点，如何确定可提交序号呢？</p>
<ul>
<li>代码@1：首先遍历 peerWaterMarks 的 value 集合，即上述示例中的 {100, 101}，用临时变量 index 来表示待投票的日志序号，需要集群内超过半数的节点的已复制序号超过该值，则该日志能被确认提交。</li>
<li>代码@2：遍历 peerWaterMarks 中的所有已提交序号，与当前值进行比较，如果节点的已提交序号大于等于待投票的日志序号(index)，num 加一，表示投赞成票。</li>
<li>代码@3：对 index 进行仲裁，如果超过半数 并且 index 大于 quorumIndex，更新 quorumIndex 的值为 index。quorumIndex 经过遍历的，得出当前最大的可提交日志序号。</li>
<li>代码@4：更新 committedIndex 索引，方便 DLedgerStore 定时将 committedIndex 写入 checkpoint 中。</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br></pre></td><td class="code"><pre><span class="line">ConcurrentMap&lt;Long, TimeoutFuture&lt;AppendEntryResponse&gt;&gt; responses = pendingAppendResponsesByTerm.get(currTerm);</span><br><span class="line"><span class="keyword">boolean</span> needCheck = <span class="keyword">false</span>;</span><br><span class="line"><span class="keyword">int</span> ackNum = <span class="number">0</span>;</span><br><span class="line"><span class="keyword">if</span> (quorumIndex &gt;= <span class="number">0</span>) &#123;</span><br><span class="line">    <span class="keyword">for</span> (Long i = quorumIndex; i &gt;= <span class="number">0</span>; i--) &#123;  <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            CompletableFuture&lt;AppendEntryResponse&gt; future = responses.remove(i);   <span class="comment">// @2</span></span><br><span class="line">            <span class="keyword">if</span> (future == <span class="keyword">null</span>) &#123;                                                                                              <span class="comment">// @3</span></span><br><span class="line">                needCheck = lastQuorumIndex != -<span class="number">1</span> &amp;&amp; lastQuorumIndex != quorumIndex &amp;&amp; i != lastQuorumIndex;</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            &#125; <span class="keyword">else</span> <span class="keyword">if</span> (!future.isDone()) &#123;                                                                                <span class="comment">// @4</span></span><br><span class="line">                AppendEntryResponse response = <span class="keyword">new</span> AppendEntryResponse();</span><br><span class="line">                response.setGroup(memberState.getGroup());</span><br><span class="line">                response.setTerm(currTerm);</span><br><span class="line">                response.setIndex(i);</span><br><span class="line">                response.setLeaderId(memberState.getSelfId());</span><br><span class="line">                response.setPos(((AppendFuture) future).getPos());</span><br><span class="line">                future.complete(response);</span><br><span class="line">            &#125;</span><br><span class="line">            ackNum++;                                                                                                      <span class="comment">// @5</span></span><br><span class="line">        &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">            logger.error(<span class="string">&quot;Error in ack to index=&#123;&#125; term=&#123;&#125;&quot;</span>, i, currTerm, t);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step5：处理 quorumIndex 之前的挂起请求，需要发送响应到客户端,其实现步骤：</p>
<ul>
<li><p>代码@1：从 quorumIndex 开始处理，没处理一条，该序号减一，直到大于0或主动退出，请看后面的退出逻辑。</p>
</li>
<li><p>代码@2：responses 中移除该日志条目的挂起请求。</p>
</li>
<li><p>代码@3：如果未找到挂起请求，说明前面挂起的请求已经全部处理完毕，准备退出，退出之前再 设置 needCheck 的值，其依据如下(三个条件必须同时满足)：</p>
<ul>
<li>最后一次仲裁的日志序号不等于-1</li>
<li>并且最后一次不等于本次新仲裁的日志序号</li>
<li>最后一次仲裁的日志序号不等于最后一次仲裁的日志。正常情况一下，条件一、条件二通常为true，但这一条大概率会返回false。</li>
</ul>
</li>
<li><p>代码@4：向客户端返回结果。</p>
</li>
<li><p>代码@5：ackNum，表示本次确认的数量。</p>
</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (ackNum == <span class="number">0</span>) &#123;</span><br><span class="line">    <span class="keyword">for</span> (<span class="keyword">long</span> i = quorumIndex + <span class="number">1</span>; i &lt; Integer.MAX_VALUE; i++) &#123;</span><br><span class="line">        TimeoutFuture&lt;AppendEntryResponse&gt; future = responses.get(i);</span><br><span class="line">        <span class="keyword">if</span> (future == <span class="keyword">null</span>) &#123;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (future.isTimeOut()) &#123;</span><br><span class="line">            AppendEntryResponse response = <span class="keyword">new</span> AppendEntryResponse();</span><br><span class="line">            response.setGroup(memberState.getGroup());</span><br><span class="line">            response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());</span><br><span class="line">            response.setTerm(currTerm);</span><br><span class="line">            response.setIndex(i);</span><br><span class="line">            response.setLeaderId(memberState.getSelfId());</span><br><span class="line">            future.complete(response);</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    waitForRunning(<span class="number">1</span>);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step6：如果本次确认的个数为0，则尝试去判断超过该仲裁序号的请求，是否已经超时，如果已超时，则返回超时响应结果。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (DLedgerUtils.elapsed(lastCheckLeakTimeMs) &gt; <span class="number">1000</span> || needCheck) &#123;</span><br><span class="line">    updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());</span><br><span class="line">    <span class="keyword">for</span> (Map.Entry&lt;Long, TimeoutFuture&lt;AppendEntryResponse&gt;&gt; futureEntry : responses.entrySet()) &#123;</span><br><span class="line">        <span class="keyword">if</span> (futureEntry.getKey() &lt; quorumIndex) &#123;</span><br><span class="line">            AppendEntryResponse response = <span class="keyword">new</span> AppendEntryResponse();</span><br><span class="line">            response.setGroup(memberState.getGroup());</span><br><span class="line">            response.setTerm(currTerm);</span><br><span class="line">            response.setIndex(futureEntry.getKey());</span><br><span class="line">            response.setLeaderId(memberState.getSelfId());</span><br><span class="line">            response.setPos(((AppendFuture) futureEntry.getValue()).getPos());</span><br><span class="line">            futureEntry.getValue().complete(response);</span><br><span class="line">            responses.remove(futureEntry.getKey());</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    lastCheckLeakTimeMs = System.currentTimeMillis();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step7：检查是否发送泄漏。其判断泄漏的依据是如果挂起的请求的日志序号小于已提交的序号，则移除。</p>
<p>Step8：一次日志仲裁就结束了，最后更新 lastQuorumIndex 为本次仲裁的的新的提交值。</p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/d35cbb14.html" rel="next" title="源码分析 RocketMQ DLedger(多副本) 之日志追加流程">
                <i class="fa fa-chevron-left"></i> 源码分析 RocketMQ DLedger(多副本) 之日志追加流程
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/433e34a1.html" rel="prev" title="源码分析 RocketMQ DLedger 多副本即主从切换实现原理">
                源码分析 RocketMQ DLedger 多副本即主从切换实现原理 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81DLedgerEntryPusher"><span class="nav-number">1.</span> <span class="nav-text">1、DLedgerEntryPusher</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#1-1-%E6%A0%B8%E5%BF%83%E7%B1%BB%E5%9B%BE"><span class="nav-number">1.1.</span> <span class="nav-text">1.1 核心类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-2-%E6%9E%84%E9%80%A0%E6%96%B9%E6%B3%95"><span class="nav-number">1.2.</span> <span class="nav-text">1.2 构造方法</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-3-startup"><span class="nav-number">1.3.</span> <span class="nav-text">1.3 startup</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81EntryDispatcher-%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.</span> <span class="nav-text">2、EntryDispatcher 详解</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#2-1-%E6%A0%B8%E5%BF%83%E7%B1%BB%E5%9B%BE"><span class="nav-number">2.1.</span> <span class="nav-text">2.1 核心类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-2-Push-%E8%AF%B7%E6%B1%82%E7%B1%BB%E5%9E%8B"><span class="nav-number">2.2.</span> <span class="nav-text">2.2 Push 请求类型</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-3-doWork-%E6%96%B9%E6%B3%95%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.3.</span> <span class="nav-text">2.3 doWork 方法详解</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-1-checkAndFreshState-%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.3.1.</span> <span class="nav-text">2.3.1 checkAndFreshState 详解</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-2-append-%E8%AF%B7%E6%B1%82%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.3.2.</span> <span class="nav-text">2.3.2 append 请求详解</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#2-3-2-1-doCommit-%E5%8F%91%E9%80%81%E6%8F%90%E4%BA%A4%E8%AF%B7%E6%B1%82"><span class="nav-number">2.3.2.1.</span> <span class="nav-text">2.3.2.1 doCommit 发送提交请求</span></a></li><li class="nav-item nav-level-5"><a class="nav-link" href="#2-3-2-2-doCheckAppendResponse-%E6%A3%80%E6%9F%A5%E5%B9%B6%E8%BF%BD%E5%8A%A0%E8%AF%B7%E6%B1%82"><span class="nav-number">2.3.2.2.</span> <span class="nav-text">2.3.2.2 doCheckAppendResponse 检查并追加请求</span></a></li><li class="nav-item nav-level-5"><a class="nav-link" href="#2-3-2-3-doAppendInner-%E8%BF%BD%E5%8A%A0%E8%AF%B7%E6%B1%82"><span class="nav-number">2.3.2.3.</span> <span class="nav-text">2.3.2.3 doAppendInner 追加请求</span></a></li></ol></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-3-compare-%E8%AF%B7%E6%B1%82%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.3.3.</span> <span class="nav-text">2.3.3  compare 请求详解</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#2-3-3-1-doTruncate-%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.3.3.1.</span> <span class="nav-text">2.3.3.1 doTruncate 详解</span></a></li></ol></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#3%E3%80%81EntryHandler-%E8%AF%A6%E8%A7%A3"><span class="nav-number">3.</span> <span class="nav-text">3、EntryHandler 详解</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#3-1-%E6%A0%B8%E5%BF%83%E7%B1%BB%E5%9B%BE"><span class="nav-number">3.1.</span> <span class="nav-text">3.1 核心类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#3-2-handlePush"><span class="nav-number">3.2.</span> <span class="nav-text">3.2 handlePush</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#3-3-doWork-%E6%96%B9%E6%B3%95%E8%AF%A6%E8%A7%A3"><span class="nav-number">3.3.</span> <span class="nav-text">3.3 doWork 方法详解</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#3-3-1-handleDoCommit"><span class="nav-number">3.3.1.</span> <span class="nav-text">3.3.1 handleDoCommit</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-3-2-handleDoCompare"><span class="nav-number">3.3.2.</span> <span class="nav-text">3.3.2 handleDoCompare</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-3-3-handleDoTruncate"><span class="nav-number">3.3.3.</span> <span class="nav-text">3.3.3 handleDoTruncate</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-3-4-handleDoAppend"><span class="nav-number">3.3.4.</span> <span class="nav-text">3.3.4 handleDoAppend</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-3-5-checkAbnormalFuture"><span class="nav-number">3.3.5.</span> <span class="nav-text">3.3.5 checkAbnormalFuture</span></a></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#4%E3%80%81QuorumAckChecker"><span class="nav-number">4.</span> <span class="nav-text">4、QuorumAckChecker</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#4-1-%E7%B1%BB%E5%9B%BE"><span class="nav-number">4.1.</span> <span class="nav-text">4.1 类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#4-2-doWork-%E8%AF%A6%E8%A7%A3"><span class="nav-number">4.2.</span> <span class="nav-text">4.2 doWork 详解</span></a></li></ol></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
